package com.atuguigu.flink.Day08;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.omg.CORBA.Environment;

import static org.apache.flink.table.api.Expressions.$;

public class Example2 {
    //Flink sql API
    public static void main(String[] args) throws Exception {
        //TODO 1 获取环境流
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //TODO 2 获取表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
        //TODO 3 引入数据流
        SingleOutputStreamOperator<SendsorReading> stream = env.addSource(new SensorSource()).filter(r -> r.id.equals("sensor_1"));
        // table api
        // 将流转化成动态表: DataStream => Table
        Table table =tableEnvironment.fromDataStream(stream, $("id"),$("temperture").as("temp"),$("timestamp").as("ts"));
        //在动态表中进行查询
        Table tableResult=table.groupBy($("id")).select($("id"), $("temp").avg());


        // 输出结果
        // 将Table转换成DataStream[Row]: Table => DataStream
        // 只要查询中有聚合操作，必须使用toRetractStream
        tableEnvironment.toRetractStream(tableResult, Row.class).print();

        // sql
        // 将流转化成表
        // 将DataStream转换成临时视图，表名是`sensor`
        tableEnvironment.createTemporaryView("sensor",stream,$("id"),$("temperture").as("temp"),$("timestamp").as("ts"));

        // 在动态表上进行查询
        Table sqlResult = tableEnvironment.sqlQuery("SELECT id,AVG(temp) FROM sensor GROUP BY id");

        //将结果表转换成输出流
        tableEnvironment.toRetractStream(sqlResult,Row.class).print();

        env.execute();


    }
}
